package com.aisafer.fms.client.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.aisafer.fms.client.dto.GroupReq;

/**
 * 文件块分片处理
 * @author stevin
 *
 */
public class Block {
	
	private static final Logger logger = LoggerFactory.getLogger(Block.class);
	
	private volatile Integer count = 0;
	
	private Lock lock = new ReentrantLock();
	
	/**
	 * 获取可以分片的总数
	 * @param contentSize
	 * @param req
	 * @return
	 */
	public int getBlockNum(final int contentSize,final int blockSize) {
		if(contentSize <= blockSize) {
			return 1;
		}
		final int result = contentSize % blockSize;
		final int blockNum =  result ==0? contentSize/blockSize : contentSize/blockSize +1; 
		return blockNum;
	}
	
	public int getCurrBlockNum(final int currBlock,final int blockSize) {
		if(currBlock <1) {
			return 0;
		}
		return currBlock * blockSize;
	}
	
	private int getSmallBlock(final int blockSize,final GroupReq groupReq) {
		if(blockSize <= 0) {
			return 1;
		}
		final int result = blockSize % groupReq.getShardsNum();
		final int blockNum =  result ==0? blockSize/groupReq.getShardsNum() : blockSize/groupReq.getShardsNum() +1; 
		return blockNum;
	}
	
	/**
	 * 获取分片大小
	 * @param content
	 * @param curBlock
	 * @param req
	 * @return
	 */
	public byte[] getSplitBlock(final byte[] content,final int curBlock,final GroupReq req) {
		if(content.length <=curBlock) {
			return content;
		}
		//文件小于分片大小
		int blockSize = req.getBlockSize();
		int currBlockNum = 0;
		final int contentSize = content.length - (curBlock <=1 ? 0 : (curBlock-1) * req.getBlockSize()) ;
		final int preBlockNum = getCurrBlockNum(curBlock-1, blockSize);
		if(contentSize < req.getBlockSize()) {
			blockSize = contentSize;
			currBlockNum= content.length;
		}else {
			currBlockNum = getCurrBlockNum(curBlock, blockSize);	
		}
		return splitBlock(content, curBlock,blockSize,currBlockNum,preBlockNum,req);
	}
	/**
	 * 一大块在分割成小块
	 * @param content
	 * @param curBlock
	 * @param blockSize
	 * @param currBlockNum
	 * @param preBlockNum
	 * @param req
	 * @return
	 */
	private byte[] splitBlock(final byte[] content, final int curBlock,final int blockSize,final int currBlockNum,final int preBlockNum,final GroupReq req) {
		long begin = System.currentTimeMillis();
		final byte[] blockByte = new byte[blockSize];
		final int smallBlockNum =  getBlockNum(blockSize, getSmallBlock(blockSize,req));
		CountDownLatch answer = new CountDownLatch(smallBlockNum);
		ExecutorService executor = Executors.newFixedThreadPool(smallBlockNum);
		int currSmallBlockNum = currBlockNum;
		BlockThread blockThread = null;
		for(int smallBlockIndexNum=1;smallBlockIndexNum<=smallBlockNum;smallBlockIndexNum++) {
			blockThread = new BlockThread(blockByte, content, currSmallBlockNum, answer,blockSize,req,curBlock,smallBlockIndexNum);
			executor.execute(blockThread);
			currSmallBlockNum = currSmallBlockNum - getSmallBlock(blockSize,req);
		}

		try {
			answer.await();
			//logger.info("blockSize:{},blockSize2:{},curBlock:{},count:{}",(currBlockNum-preBlockNum),blockSize,curBlock,count);
			if(blockSize > count) {
				logger.error("blockSize:{},count:{},curBlock:{}",blockSize,count,curBlock);
				return null;
			}
		} catch (InterruptedException e) {
			logger.error(e.getMessage());
		}
		logger.info("splitBlock.curBlock:{}.time:{}",curBlock,(System.currentTimeMillis() - begin));
		return blockByte;
	}
	
	private void appendBlockContent(final byte[] content,final byte[] blockByte,final int currSmallBlockNum,final int preBlockNum,final int blockSize,final int curBlock,final int smallBlock,GroupReq req) {
		final int preSmallBlockNum = preBlockNum <0 ? 0 : preBlockNum;
		int tempCurrSmallBlockNum = currSmallBlockNum;
		int smallBlockIndex = blockSize - (smallBlock-1) * getSmallBlock(blockSize,req);
		if(smallBlockIndex >= blockByte.length) {
			smallBlockIndex -=1;
			tempCurrSmallBlockNum -= 1;
		}
		
		/*final int smallBlockNum =  getBlockNum(blockSize, getSmallBlock(blockSize,req));
		if(smallBlockNum == smallBlock && blockSize % req.getBlockPercent() != 0) {
			preSmallBlockNum +=1;
		}*/
		if(logger.isDebugEnabled()) {
			logger.debug("begin:currBlock:{},smallBlock:{},currBlockNum:{},smallBlockIndex:{},block:{},preSmallBlockNum:{}",curBlock,smallBlock,tempCurrSmallBlockNum,smallBlockIndex,blockSize,preSmallBlockNum);
		}
		int i=tempCurrSmallBlockNum;
		while(i>=preSmallBlockNum) {
			if(i < content.length && smallBlockIndex >=0 ) {
				//try {
					byte temp =  content[i];
					blockByte[smallBlockIndex] = temp;
				//}catch(Exception e) {
					//logger.error("smallBlockIndex:{},block:{},currBlockNum:{}",smallBlockIndex,blockSize,currSmallBlockNum);
				//}
				smallBlockIndex--;
			}
			i--;
			
		}
		if(logger.isDebugEnabled()) {
			logger.debug("end:currBlock:{},smallBlock:{},currBlockNum:{},smallBlockIndex:{},block:{},preSmallBlockNum:{},i:{}",curBlock,smallBlock,tempCurrSmallBlockNum,smallBlockIndex,blockSize,preSmallBlockNum,i);
		}
		
		
	}
	
	class BlockThread implements Runnable{
		private byte[] blockByte = null;
		private final byte[] content;
		private final int currSmallBlockNum;
		private final CountDownLatch answer;
		private final int blockSize;
		private final GroupReq req;
		private final int  curBlock;
		private final int smallBlock;
		
		public BlockThread(final byte[] blockByte,final byte[] content,final int currSmallBlockNum,final CountDownLatch answer,final int blockSize,final GroupReq req,final int curBlock,final int smallBlock) {
			this.blockByte = blockByte;
			this.content = content;
			this.currSmallBlockNum = currSmallBlockNum;
			this.answer = answer;
			this.blockSize = blockSize;
			this.req = req;
			this.curBlock=curBlock;
			this.smallBlock = smallBlock;
		}

		@Override
		public void run() {
			appendBlockContent(content, blockByte, currSmallBlockNum, currSmallBlockNum-getSmallBlock(blockSize,req),blockSize,curBlock,smallBlock,req);
			autoIncrement(currSmallBlockNum-(currSmallBlockNum-getSmallBlock(blockSize,req)),curBlock);
			answer.countDown();
		}
	}
	
	private void autoIncrement(int value,final int curBlock) {
		lock.lock();
		try {
			if(value > 0) {
				count = count + value; 
				if(logger.isDebugEnabled()) {
					logger.debug("in:curBlock:{},count:{},value:{}",curBlock,count,value);
				}
			}else {
				count++;
			}
		}finally {
			lock.unlock();
		}
	}
	
	public byte[] mergeByteArray(Map<Integer,byte[]> blockAttachmentMap) {
		if(blockAttachmentMap == null) {
			return null;
		}
		List<byte[]> contentList = sortMap(blockAttachmentMap);
		byte[] contentByte = new byte[sum(contentList)];
		int index=0;
		for(int j=0;j<contentList.size();j++) {
			byte[] content =  contentList.get(j);
			for(int i=0;i<content.length;i++) {
				byte temp =  content[i];
				contentByte[index] = temp;
				index++;
			}
		}
		return contentByte;
	}
	
	public int sum(List<byte[]> contentList) {
		if(contentList == null) {
			return 0;
		}
		int sum = 0;
		for(byte[] content : contentList) {
			sum += content.length;
		}
		return sum;
	}
	
	public List<byte[]> sortMap(Map<Integer,byte[]> map){
		 List<byte[]> lhm = new ArrayList<>();
		 List<Map.Entry<Integer,byte[]>> infos = new ArrayList<Map.Entry<Integer,byte[]>>(map.entrySet());
	        Collections.sort(infos, new Comparator<Map.Entry<Integer,byte[]>>(){
	            @Override
	            public int compare(Map.Entry<Integer,byte[]> o1, Map.Entry<Integer,byte[]> o2) {
	                return o1.getKey() - o2.getKey();
	            }
	        });
	        for(Map.Entry<Integer,byte[]> entry:infos){
	            lhm.add(entry.getValue());
	        }
	        return lhm;
	}
	

}
